package com.isyscore.os.etl.service.impl;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.isyscore.boot.login.LoginUserManagerImpl;
import com.isyscore.device.common.model.RespDTO;
import com.isyscore.device.common.util.JsonMapper;
import com.isyscore.os.core.entity.Resource;
import com.isyscore.os.core.exception.DataFactoryException;
import com.isyscore.os.core.exception.ErrorCode;
import com.isyscore.os.core.mapper.JobConfigMapper;
import com.isyscore.os.core.model.entity.JobConfig;
import com.isyscore.os.core.model.entity.JobRunLog;
import com.isyscore.os.core.model.entity.RowSQL;
import com.isyscore.os.core.sqlcore.MySQLBuilder;
import com.isyscore.os.core.util.InitiallyUtils;
import com.isyscore.os.etl.config.WaitForPoolConfig;
import com.isyscore.os.etl.constant.CommonConstant;
import com.isyscore.os.etl.constant.JobConstant;
import com.isyscore.os.etl.manager.FlinkSqlExecutorJarManager;
import com.isyscore.os.etl.manager.MysqlDataSourceManager;
import com.isyscore.os.etl.manager.QuartzJobManager;
import com.isyscore.os.etl.model.Insert;
import com.isyscore.os.etl.model.StatusAndLogHandler;
import com.isyscore.os.etl.model.Update;
import com.isyscore.os.etl.model.dto.*;
import com.isyscore.os.etl.model.enums.DeployModeEnum;
import com.isyscore.os.etl.model.enums.JobConfigStatus;
import com.isyscore.os.etl.model.enums.JobRunStatus;
import com.isyscore.os.etl.model.enums.JobType;
import com.isyscore.os.etl.model.flinkviewable.FlinkSQLTemplate;
import com.isyscore.os.etl.model.flinkviewable.Mapping;
import com.isyscore.os.etl.model.flinkviewable.TypeConvert;
import com.isyscore.os.etl.model.form.JobViewable;
import com.isyscore.os.etl.service.*;
import com.isyscore.os.etl.utils.*;
import com.isyscore.os.permission.entity.LoginVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.validation.BeanPropertyBindingResult;
import org.springframework.validation.SmartValidator;
import org.springframework.web.client.HttpClientErrorException;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import static com.isyscore.os.etl.constant.JobConstant.*;

/**
 * 任务配置实现类
 *
 * @author wuwx
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class JobConfigServiceImpl extends ServiceImpl<JobConfigMapper, JobConfig> implements JobConfigService {

    private final JobRunLogService jobRunLogService;


    private final JobConfigHistoryService jobConfigHistoryService;

    @Value("${flink-config.host}:${flink-config.port}")
    private String localFlinkHttpAddress;

    @Value("${flink-config.client-home}")
    private String clientHome;

    @Value("${flink-config.sql-path}")
    private String sqlPath;

    @Value("${flink-config.udf-path}")
    private String udfPath;

    @Autowired
    @Qualifier("jobExecutor")
    private ThreadPoolExecutor jobExecutor;

    @Autowired
    @Qualifier("statusExecutor")
    private ThreadPoolExecutor statusExecutor;

    private static Integer RETRY = 3;

    private final QuartzJobManager quartzJobManager;

    private final LoginUserManagerImpl loginUserManager;

    private final DatasourceService datasourceService;

    private final MysqlDataSourceManager mysqlDataSourceManager;

    private final FlinkSqlExecutorJarManager flinkSqlExecutorJarManager;

    @Autowired
    @Lazy
    private ResourceService resourceService;

    public static final String DEFAULT_TENANT_ID = "initially";
    public static final String FLINK_COMMON_LIB_NAME = "FLINK 通用依赖";

    @Override
    public void manuallyStop(String id) {
        try {
            quartzJobManager.deleteJob(id);
        } catch (DataFactoryException e) {
            log.error("手动关闭调度失败{}", id);
        }
    }

    @Override
    public RespDTO addJob(JobConfig jobConfig) {
        //2022-04-02 产品要求去掉任务的任务名重复校验
//        if (this.count(Wrappers.lambdaQuery(new JobConfig()).eq(JobConfig::getJobName, jobConfig.getJobName())) > 0) {
//            throw new DataFactoryException(ErrorCode.JOB_NAME_DUPLICATE);
//        }
        checkJobConfigParam(jobConfig);
        // select -> insert template
        if (JobType.ROW_SQL.getCode() == jobConfig.getJobType() && Objects.equals(jobConfig.getIsChecked(), CommonConstant.TRUE)) {
            List<Mapping> mappings = JsonMapper.fromJsonToList(jobConfig.getMappings(), Mapping.class);
            if (mappings == null || mappings.isEmpty()) {
                throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "字段映射信息");
            }
            //拼接 insert sql 模版
            String insertTemplate = buildTemplate(jobConfig);
            // insert 模版 放在 flinkSQL 字段中
            jobConfig.setFlinkSql(insertTemplate);
        }
        this.save(jobConfig);
        if (Objects.equals(jobConfig.getIsChecked(), CommonConstant.FALSE)) {
            return RespDTO.onSuc(ImmutableMap.of(
                    "code", ErrorCode.PRECHECK_ERROR.getCode(),
                    "message", ErrorCode.PRECHECK_ERROR.getMessage(),
                    "id", jobConfig.getId()
            ));
        } else {
            return RespDTO.onSuc(ImmutableMap.of(
                    "code", ErrorCode.OK.getCode(),
                    "message", ErrorCode.OK.getMessage(),
                    "id", jobConfig.getId()
            ));
        }
    }

    public static String buildTemplate(JobConfig jobConfig) {
        MySQLBuilder builder = new MySQLBuilder();
        List<Mapping> mappings = JsonMapper.fromJsonToList(jobConfig.getMappings(), Mapping.class);
        builder.insert(jobConfig.getTargetTable());
        mappings.forEach(map -> {
            String colName = map.getTargetCol();
            builder.addValue(colName, "?");
        });
        String sql = builder.build();
        return sql.replaceAll("'\\?'", "?");
    }

    /**
     * <p>
     * 校验入参参数
     * </p>
     *
     * @author zhan9yn
     * @date 2021/12/28 4:48 下午
     */
    public void checkJobConfigParam(JobConfig jobConfig) {
        if (JobType.ROW_SQL.getCode() == jobConfig.getJobType()) {
            BeanPropertyBindingResult result = new BeanPropertyBindingResult(jobConfig, "config");
            validator.validate(jobConfig, result, RowSQL.class);
            if (result.hasErrors()) {
                String res = result.getFieldErrors().stream().map(error -> error.getField() + ": " + error.getDefaultMessage()).collect(Collectors.joining("; "));
                log.error("组件参数校验错误{}", res);
                throw new DataFactoryException(ErrorCode.MISSING_PARAMS, res);
            }
            //校验 SQL
            preCheckSQL(jobConfig);
            return;
        }
        if (jobConfig.getDeployMode() == null) {
            throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "运行模式");
        }
        if (jobConfig.getDeployMode().equals(DeployModeEnum.STANDALONE.name()) &&
                StringUtils.isEmpty(jobConfig.getFlinkRunConfig())) {
            throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "flinkRunConfig");
        }

        if (Objects.equals(jobConfig.getIsScheduled(), CommonConstant.TRUE)
                && StringUtils.isEmpty(jobConfig.getCron())) {
            throw new DataFactoryException(ErrorCode.PARAM_ERROR);
        }

        if (JobType.JAR.getCode() == jobConfig.getJobType()) {
            if (StringUtils.isEmpty(jobConfig.getCustomMainClass())) {
                throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "customMainClass");
            }
            if (StringUtils.isEmpty(jobConfig.getCustomJarUrl())) {
                throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "customJarUrl");
            }
            if (!MatcherUtils.isHttpsOrHttp(jobConfig.getCustomJarUrl())) {
                throw new DataFactoryException(ErrorCode.PARAM_ERROR);
            }
        } else if (JobType.SQL_STREAMING.getCode() == jobConfig.getJobType() ||
                JobType.SQL_BATCH.getCode() == jobConfig.getJobType()) {
            if (StringUtils.isEmpty(jobConfig.getFlinkSql())) {
                throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "flinkSql");
            }
            if (StringUtils.isNotEmpty(jobConfig.getExtJarPath())) {
                String[] urls = jobConfig.getExtJarPath().split(",");
                for (String url : urls) {
                    if (StringUtils.isEmpty(url)) {
                        continue;
                    }
                    if (!MatcherUtils.isHttpsOrHttp(url)) {
                        throw new DataFactoryException(ErrorCode.PARAM_ERROR);
                    }
                }
            }
        } else {
            throw new DataFactoryException(ErrorCode.PARAM_ERROR);
        }
    }

    public String preCheckSQL(JobConfig jobConfig) {
        String err = null;
        //校验 SQL
        String sql = jobConfig.getRowSql();
        if (sql.trim().toUpperCase().startsWith("SELECT")) {
            Connection sourceCon = null;
            Statement statement = null;
            try {
                DataSourceDTO source = datasourceService.get(Long.parseLong(jobConfig.getSourceId()),loginUserManager.getCurrentTenantId()).getData();
                source.setSelectDatabaseName(jobConfig.getSourceDatabase());
                if (source == null) {
                    throw new DataFactoryException(ErrorCode.DATA_NOT_FOUND, "数据源不存在");
                }
                sourceCon = mysqlDataSourceManager.getConn(source);
                statement = sourceCon.createStatement();
                statement.executeQuery(jobConfig.getRowSql());
                jobConfig.setIsChecked(CommonConstant.TRUE);
            } catch (Throwable e) {
                err = e.getMessage();
                log.error("sql 预校验错误：{}", e);
                jobConfig.setIsChecked(CommonConstant.FALSE);
            } finally {
                mysqlDataSourceManager.close(sourceCon, statement, null);
            }
        } else {
            jobConfig.setIsChecked(CommonConstant.FALSE);
        }
        return err;
    }

    @Override
    public RespDTO updateJobConfigById(JobConfigDTO jobConfigDTO) {
        JobConfig jobConfigUpdate = JobConfigDTO.toEntity(jobConfigDTO);
        if (StringUtils.isNotEmpty(jobConfigDTO.getJobName())) {
            this.checkJobName(jobConfigDTO.getJobName(), jobConfigDTO.getId());
        }
        this.updateById(jobConfigUpdate);
//        this.insertJobConfigHistory(jobConfigUpdate.getId());
        return RespDTO.onSuc();
    }

    @Override
    public RespDTO updateJobConfig(JobConfig jobConfig) {
        if (this.count(Wrappers.lambdaQuery(new JobConfig()).eq(JobConfig::getId, jobConfig.getJobId())) > 0) {
            return RespDTO.onFail(ErrorCode.JOB_DOES_NOT_EXISTS);
        }
        checkJobConfigParam(jobConfig);
        JobConfigStatus status = JobConfigStatus.getJobConfigStatus(jobConfig.getStatus());
        if (JobConfigStatus.STARTING.equals(status)
                || JobConfigStatus.RUNNING.equals(status)) {
            return RespDTO.onFail(ErrorCode.JOB_STATUS_ERROR);
        }
        if (Objects.equals(CommonConstant.TRUE, jobConfig.getIsScheduled())) {
            throw new DataFactoryException(ErrorCode.JOB_IS_SCHEDULING);
        }

        if (JobType.ROW_SQL.getCode() == jobConfig.getJobType() && Objects.equals(jobConfig.getIsChecked(), CommonConstant.TRUE)) {
            List<Mapping> mappings = JsonMapper.fromJsonToList(jobConfig.getMappings(), Mapping.class);
            if (mappings == null || mappings.isEmpty()) {
                throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "字段映射信息");
            }
            //拼接 insert sql 模版
            String insertTemplate = buildTemplate(jobConfig);
            jobConfig.setFlinkSql(insertTemplate);
            this.updateById(jobConfig);
        } else {
            LambdaUpdateWrapper update = Wrappers.<JobConfig>lambdaUpdate().eq(JobConfig::getId, jobConfig.getId())
                    .set(JobConfig::getFlinkRunConfig, jobConfig.getFlinkRunConfig());
            this.update(jobConfig, update);
        }
        if (Objects.equals(jobConfig.getIsChecked(), CommonConstant.FALSE)) {
            return RespDTO.onFail(ErrorCode.PRECHECK_ERROR);
        } else {
            return RespDTO.onSuc(ImmutableMap.of(
                    "id", jobConfig.getId()
            ));
        }
    }

    @Override
    /**
     * <p>
     *     开启配置，表示不能编辑，可以运行任务
     * </p>
     * @param id description
     * @return com.isyscore.device.common.model.RespDTO
     * @throws
     * @author zhan9yn
     * @date 2021/12/29 2:23 下午
     */
    public RespDTO openJob(String id) {
        JobConfig jobConfig = Optional.ofNullable(getById(id)).orElseThrow(() -> new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS));
        jobConfig.setIsOpen(CommonConstant.TRUE);
        this.updateById(jobConfig);
        return RespDTO.onSuc();
    }

    @Override
    /**
     * <p>
     *     关闭配置，表示可以编辑
     * </p>
     * @param id description
     * @return com.isyscore.device.common.model.RespDTO
     * @throws
     * @author zhan9yn
     * @date 2021/12/29 2:23 下午
     */
    public RespDTO closeJob(String id) {
        JobConfig jobConfig = Optional.ofNullable(getById(id)).orElseThrow(() -> new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS));
        jobConfig.setIsOpen(CommonConstant.FALSE);
        this.updateById(jobConfig);
        return RespDTO.onSuc();
    }

    @Override
    public RespDTO deleteJobConfigById(String id) {
        JobConfig jobConfig = getById(id);
        if (jobConfig == null) {
            throw new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS);
        }
        JobConfigStatus status = JobConfigStatus.getJobConfigStatus(jobConfig.getStatus());
        if (JobConfigStatus.RUNNING.equals(status) ||
                JobConfigStatus.STARTING.equals(status)) {
            throw new DataFactoryException(ErrorCode.JOB_STATUS_ERROR);
        }

        if (Objects.equals(CommonConstant.TRUE, jobConfig.getIsScheduled())) {
            throw new DataFactoryException(ErrorCode.JOB_IS_SCHEDULING);
        }

        this.removeById(id);
        jobRunLogService.deleteLogByConfigId(id);
        return RespDTO.onSuc();
    }

    @Override
    public JobConfigDTO getJobConfigById(String id) {
        if (id == null) {
            throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "id");
        }
        JobConfig jobConfig = this.getById(id);
        if (jobConfig == null) {
            throw new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS);
        }
        return JobConfigDTO.toDTO(jobConfig);
    }

    @Override
    public RespDTO startJob(String id) {
//        JobConfigDTO jobConfigDTO = this.getJobConfigById(id);
        JobConfig jobConfig = Optional.ofNullable(getById(id)).orElseThrow(() -> new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS));
        JobType type = JobType.getJobType(jobConfig.getJobType());
        if (Objects.equals(type, JobType.VIEW) && Objects.equals(jobConfig.getIsChecked(), NO_CHECKED)) {
            throw new DataFactoryException(ErrorCode.NO_CHECKED);
        }
        if (Objects.equals(type, JobType.ROW_SQL) && Objects.equals(jobConfig.getIsChecked(), NO_CHECKED)) {
            throw new DataFactoryException(ErrorCode.MYSQL_ROWSQL_CHECKED);
        }
        if (Objects.equals(jobConfig.getIsScheduled(), CommonConstant.TRUE)) {
            throw new DataFactoryException(ErrorCode.JOB_IS_SCHEDULING);
        }

        startJobInternal(jobConfig, true);
        return RespDTO.onSuc();
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void schedule(String id, Integer isScheduled) {
        //  关闭调度
//        JobConfigDTO jobConfigDTO = getJobConfigById(id);
        JobConfig jobConfig = Optional.ofNullable(getById(id)).orElseThrow(() -> new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS));
        if (Objects.equals(CommonConstant.FALSE, isScheduled)) {
            if (Objects.equals(CommonConstant.FALSE, jobConfig.getIsScheduled())) {
                log.error("当前工作流{}##{}，无法停止调度", jobConfig.getId(), jobConfig.getIsScheduled());
                throw new DataFactoryException(ErrorCode.CANNOT_STOP);
            } else {
                //定时任务
                try {
                    log.info("定时任务{}#{}停止：{}", JobConstant.JOB_GROUP, jobConfig.getId(), LocalDateTime.now());
                    quartzJobManager.pauseJob(jobConfig.getId());
                    quartzJobManager.deleteJob(jobConfig.getId());
                    log.info("定时任务{}#{}停止完成：{}", JobConstant.JOB_GROUP, jobConfig.getId(), LocalDateTime.now());
                    jobConfig.setIsScheduled(CommonConstant.FALSE);
                    this.updateById(jobConfig);
                } catch (DataFactoryException e) {
                    log.error("定时任务停止失败：{}", e);
                    throw new DataFactoryException(ErrorCode.STOP_FAILED);
                }
            }
        } else if (Objects.equals(CommonConstant.TRUE, isScheduled)) {
            //开启调度
            if (Objects.equals(CommonConstant.TRUE, jobConfig.getIsScheduled())) {
                throw new DataFactoryException(ErrorCode.ALREADY_SCHEDULED);
            } else {
                String cron = jobConfig.getCron();
                if (!CronUtils.isValid(cron)) {
                    throw new DataFactoryException(ErrorCode.CRON_VALID);
                }
                try {
                    quartzJobManager.addJob(id,jobConfig.getCron(), loginUserManager.getCurrentLoginUser(),false);
//                    JobConfig jobConfig = new JobConfig();
//                    jobConfig.setId(jobConfigDTO.getId());
                    jobConfig.setIsScheduled(CommonConstant.TRUE);
                    this.updateById(jobConfig);
                } catch (Exception e) {
                    log.error("定时任务开启失败{}", e);
                    throw new DataFactoryException(ErrorCode.SCHEDULED_FAILED);
                }
            }
        } else {
            throw new DataFactoryException(ErrorCode.PARAM_ERROR);
        }
    }

    public void execSqlJob(JobConfig jobConfig, String jobRunLogId) {
        LoginVO current = loginUserManager.getCurrentLoginUser();
        jobExecutor.execute(() -> {
            loginUserManager.setThreadLocalLoginUser(current);
            JobRunLog jobRunLog = jobRunLogService.getById(jobRunLogId);
            boolean isSuccess = true;
            StringBuilder logAppender = new StringBuilder(jobRunLog.getLocalLog());
            Connection sourceCon = null;
            Connection targetCon = null;
            Statement statement = null;
            PreparedStatement ps = null;
            ResultSet res = null;
            //更新日志为运行中
            jobConfig.setStatus(JobConfigStatus.RUNNING.getCode());
            jobRunLog.setJobStatus(JobRunStatus.RUNNING.name());
            jobRunLogService.updateById(jobRunLog);
            updateById(jobConfig);
            try {
                DataSourceDTO source = datasourceService.get(Long.parseLong(jobConfig.getSourceId()),current.getTenantId()).getData();
                DataSourceDTO target = datasourceService.get(Long.parseLong(jobConfig.getTargetId()),current.getTenantId()).getData();
                source.setSelectDatabaseName(jobConfig.getSourceDatabase());
                target.setSelectDatabaseName(jobConfig.getTargetDatabase());
                if (source == null) {
                    throw new DataFactoryException(ErrorCode.DATA_NOT_FOUND, "数据源不存在");
                }
                if (target == null) {
                    throw new DataFactoryException(ErrorCode.DATA_NOT_FOUND, "目标数据源不存在");
                }
                sourceCon = mysqlDataSourceManager.getConn(source);
                targetCon = mysqlDataSourceManager.getConn(target);
                List<Mapping> mappings = JsonMapper.fromJsonToList(jobConfig.getMappings(), Mapping.class);
                statement = sourceCon.createStatement();
                res = statement.executeQuery(jobConfig.getRowSql());
                ps = targetCon.prepareStatement(jobConfig.getFlinkSql());
                while (res.next()) {
                    ps.clearParameters();
                    for (int i = 0; i < mappings.size(); i++) {
                        Mapping m = mappings.get(i);
                        String sourceCol = m.getSourceCol();
                        Object o = res.getObject(sourceCol);
                        ps.setObject(i + 1, o);
                    }
                    ps.addBatch();
                }
                ps.executeBatch();
            } catch (Exception e) {
                isSuccess = false;
                log.error("执行原生SQL任务失败：{}", e);
                logAppender.append("\n");
                logAppender.append("执行原生SQL任务失败：\n" + e.getMessage() + "\n");
            } finally {
                //关闭资源
                mysqlDataSourceManager.close(sourceCon, statement, res);
                mysqlDataSourceManager.close(targetCon, ps, null);
                //更新日志
                logAppender.append("\n");
                logAppender.append(LocalDateTime.now() + "任务执行完毕");
                jobRunLog.setLocalLog(logAppender.toString());
                //更新任务状态
                if (isSuccess) {
                    jobConfig.setStatus(JobConfigStatus.FINISHED.getCode());
                    jobRunLog.setJobStatus(JobRunStatus.SUCCESS.name());
                } else {
                    jobConfig.setStatus(JobConfigStatus.FAILED.getCode());
                    jobRunLog.setJobStatus(JobRunStatus.FAILED.name());
                }
                jobRunLogService.updateById(jobRunLog);
                updateById(jobConfig);
            }
        });
    }


    public RespDTO startJobInternal(JobConfig jobConfig, boolean isChecked) {
        JobConfig config = new JobConfig();
        config.setId(jobConfig.getId());
        config.setStatus(JobConfigStatus.STARTING.getCode());
        updateById(config);
        String jobRunLogId = insertJobRunLog(jobConfig);

        if (JobType.ROW_SQL.getCode() == jobConfig.getJobType()) {
            execSqlJob(jobConfig, jobRunLogId);
            return RespDTO.onSuc();
        }
        //开启任务进行预检查
        if (isChecked) {
            this.checkStart(jobConfig);
        }
        flinkPingTest(jobConfig,jobRunLogId);
        JobRunParamDTO jobRunParamDTO = getJobRunParamDTO(jobConfig);
        aSyncExecJob(jobRunParamDTO, jobConfig, jobRunLogId);
        return RespDTO.onSuc();
    }

    public void startJobInternalSyn(String jobId) {
        log.info("--> 同步执行任务 <--");
        JobConfig jobConfig = getById(jobId);
        //开启任务进行预检查
        if (jobConfig == null) {
            throw new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS);
        }
        Integer type = jobConfig.getJobType();
        if (Objects.equals(type, JobType.VIEW) && Objects.equals(jobConfig.getIsChecked(), NO_CHECKED)) {
            throw new DataFactoryException(ErrorCode.NO_CHECKED);
        }
        if (Objects.equals(type, JobType.ROW_SQL) && Objects.equals(jobConfig.getIsChecked(), NO_CHECKED)) {
            throw new DataFactoryException(ErrorCode.MYSQL_ROWSQL_CHECKED);
        }
        JobConfigStatus status = JobConfigStatus.getJobConfigStatus(jobConfig.getStatus());
        if (JobConfigStatus.RUNNING.equals(status) ||
                JobConfigStatus.STARTING.equals(status)) {
            throw new DataFactoryException(ErrorCode.JOB_STATUS_ERROR);
        }
        if (!Objects.equals(CommonConstant.TRUE, jobConfig.getIsScheduled())) {
            log.info("任务#{}不在调度中，删除当前调度任务");
            try {
                quartzJobManager.deleteJob(jobConfig.getId());
            } catch (DataFactoryException e) {
                log.error("定时任务删除失败：{}", e);
            }
        }

        LambdaUpdateWrapper<JobConfig> wrapper = Wrappers.lambdaUpdate(jobConfig);
        this.update(wrapper.eq(JobConfig::getId, jobConfig.getId()).set(JobConfig::getStatus, JobConfigStatus.STARTING.getCode()));
        String jobRunLogId = insertJobRunLog(jobConfig);
        if (JobType.ROW_SQL.getCode() == jobConfig.getJobType()) {
            execSqlJob(jobConfig, jobRunLogId);
            return;
        }
        flinkPingTest(jobConfig,jobRunLogId);
        JobRunParamDTO jobRunParamDTO = getJobRunParamDTO(jobConfig);
        readyToSubmitJob(jobRunParamDTO, jobConfig, jobRunLogId);
    }

    public void resumeSchedule(String jobId) {
        log.info("##任务开始##");
//        JobConfigDTO jobConfigDTO = getJobConfigById(jobId);
        JobConfig jobConfig = getById(jobId);
        //开启任务进行预检查
        if (jobConfig == null) {
            throw new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS);
        }
        if (!Objects.equals(CommonConstant.TRUE, jobConfig.getIsScheduled())) {
            log.info("任务#{}不在调度中，删除当前调度任务");
            try {
                quartzJobManager.deleteJob(jobConfig.getId());
            } catch (DataFactoryException e) {
                log.error("定时任务删除失败：{}", e);
            }
        }
        String jobRunLogId = insertJobRunLog(jobConfig);
        LambdaUpdateWrapper<JobConfig> wrapper = Wrappers.lambdaUpdate(jobConfig);
        this.update(wrapper.eq(JobConfig::getId, jobConfig.getId()).set(JobConfig::getStatus, JobConfigStatus.STARTING.getCode()));
        if (JobType.ROW_SQL.getCode() == jobConfig.getJobType()) {
            execSqlJob(jobConfig, jobRunLogId);
            return;
        }
        flinkPingTest(jobConfig,jobRunLogId);
        JobRunParamDTO jobRunParamDTO = getJobRunParamDTO(jobConfig);
        readyToSubmitJob(jobRunParamDTO, jobConfig, jobRunLogId);
    }

    @Override
    public RespDTO stopJob(String id) {
        JobConfig jobConfig = this.getById(id);
        if (jobConfig == null) {
            return RespDTO.onFail(ErrorCode.JOB_DOES_NOT_EXISTS);
        }
        if (!Objects.equals(jobConfig.getStatus(), JobConfigStatus.RUNNING.getCode())) {
            throw new DataFactoryException(ErrorCode.CANNOT_STOP);
        }

        if (JobType.ROW_SQL.getCode() == jobConfig.getJobType()) {
            jobConfig.setStatus(JobConfigStatus.CANCELED.getCode());
            updateById(jobConfig);
            return RespDTO.onSuc();
        }
        flinkPingTest(jobConfig,null);
        String flinkHttpAddress = localFlinkHttpAddress;

        if (!DeployModeEnum.LOCAL.name().equals(jobConfig.getDeployMode())) {
            flinkHttpAddress = CommandUtils.parseFlinkHttpAddress(jobConfig.getFlinkRunConfig());
            if (flinkHttpAddress == null) {
                log.error("解析flink集群地址出错，请检查是否-m指定flink集群地址");
                throw new DataFactoryException(ErrorCode.PARAM_ERROR, "flinkRunConfig");
            }
        }
        JobStandaloneInfoDTO jobStandaloneInfoDTO = null;
        try {
            jobStandaloneInfoDTO = getJobInfoForStandaloneByAppId(jobConfig.getJobId(),
                    flinkHttpAddress);
        } catch (Exception e) {
            //目前这个地方会报404，catch 到强制刷新数据库状态
            jobConfig.setStatus(JobConfigStatus.CANCELED.getCode());
            this.updateById(jobConfig);
        }
        //flink侧查找不到对应的job
        //由于此时库中的状态为running
        //把库中的状态强刷为取消
        if (jobStandaloneInfoDTO == null || StringUtils.isNotEmpty(jobStandaloneInfoDTO.getErrors()) || StringUtils.isEmpty(jobStandaloneInfoDTO.getState())) {
            log.error(String.format("getJobInfoForStandaloneByAppId error，jobStandaloneInfo=%s", jobStandaloneInfoDTO));
            jobConfig.setStatus(JobConfigStatus.CANCELED.getCode());
            this.updateById(jobConfig);
        } else {
            //能够在flink侧找到对应的job
            if (!JobRunStatus.FAILED.name().equals(jobStandaloneInfoDTO.getState()) &&
                    !JobRunStatus.FINISHED.name().equals(jobStandaloneInfoDTO.getState()) &&
                    !JobRunStatus.CANCELED.name().equals(jobStandaloneInfoDTO.getState())) {
                //如果不在其中，就需要停止任务，flink侧停止后，即可由 StatusAndLogHandler 进行落库更新
                this.cancelJobForFlinkByAppId(jobConfig);
            } else {
                //库中状态为 running，flink侧为以上三种状态，在 StatusAndLogHandler 进行更新
                if (Objects.equals(this.getJobConfigById(id).getStatus(), JobConfigStatus.RUNNING)) {
                    jobConfig.setStatus(JobConfigStatus.CANCELED.getCode());
                    this.updateById(jobConfig);
                }
            }
        }
        return RespDTO.onSuc();
    }



    @Override
    @Transactional(rollbackFor = Exception.class)
    public void checkJobStatus() {
        // 2021.08.30 fix 日志同步的问题
        List<JobConfigDTO> jobConfigDTOList = JobConfigDTO.toListDTO(list(
                new LambdaQueryWrapper<JobConfig>().in(JobConfig::getStatus, JobConfigStatus.RUNNING.getCode(), JobConfigStatus.STARTING.getCode())));
        List<JobRunLogDTO> jobRunLogDTOList = JobRunLogDTO.toListDTO(jobRunLogService.list());
        if (CollectionUtils.isEmpty(jobConfigDTOList) || CollectionUtils.isEmpty(jobRunLogDTOList)) {
            return;
        }
        for (JobConfigDTO jobConfigDTO : jobConfigDTOList) {
            final String id = jobConfigDTO.getId();
            Optional<JobRunLogDTO> jobRunLogDTO = jobRunLogDTOList.parallelStream().filter(log -> Objects.equals(log.getJobConfigId(), id)).findFirst();
            this.checkStandalone(jobConfigDTO, jobRunLogDTO.orElseThrow(() -> new DataFactoryException(ErrorCode.LOG_NOT_FOUND)));
        }
    }

    public void aSyncExecJob(JobRunParamDTO jobRunParamDTO, JobConfig jobConfig,
                             String jobRunLogId) {
        LoginVO current = loginUserManager.getCurrentLoginUser();
        jobExecutor.execute(() -> {
            loginUserManager.setThreadLocalLoginUser(current);
            readyToSubmitJob(jobRunParamDTO, jobConfig, jobRunLogId);
        });
    }

    public JobStandaloneInfoDTO getJobInfoForStandaloneByAppId(String appId, String flinkHttpAddress) {
        if (StringUtils.isEmpty(appId)) {
            return null;
        }
        String res = null;
        JobStandaloneInfoDTO jobStandaloneInfoDTO = null;
        String url;
        try {

            if (flinkHttpAddress.startsWith("http://") || flinkHttpAddress.startsWith("https://")) {
                url = HttpUtils.buildUrl(flinkHttpAddress, "/jobs/" + appId);
            } else {
                url = HttpUtils.buildUrl("http://" + flinkHttpAddress, "/jobs/" + appId);
            }
            log.debug(String.format("[getJobInfoForStandaloneByAppId]请求参数 appId=%s url=%s", appId, url));
            res = HttpUtils.buildRestTemplate(HttpUtils.TIME_OUT_1_M).getForObject(url, String.class);
            log.debug(String.format("[getJobInfoForStandaloneByAppId]请求参数结果: res=%s", res));
            if (StringUtils.isEmpty(res)) {
                return null;
            }
            jobStandaloneInfoDTO = JSON.parseObject(res, JobStandaloneInfoDTO.class);
            return jobStandaloneInfoDTO;
        } catch (HttpClientErrorException e) {
            jobStandaloneInfoDTO = new JobStandaloneInfoDTO();
            jobStandaloneInfoDTO.setErrors(e.getMessage());
            log.error(String.format("json异常，res=%s", res), e);
        } catch (Exception e) {
            log.error(String.format("json异常，res=%s", res), e);
        }

        return jobStandaloneInfoDTO;
    }

    public String getExceptions(@NotEmpty String jobId, @NotEmpty String flinkHttpAddress) {
        String res = null;
        String url;
        try {
            if (flinkHttpAddress.startsWith("http://") || flinkHttpAddress.startsWith("https://")) {
                url = HttpUtils.buildUrl(flinkHttpAddress, "/jobs/" + jobId);
            } else {
                url = HttpUtils.buildUrl("http://" + flinkHttpAddress, "/jobs/" + jobId);
            }
            url += "/exceptions?maxExceptions=10";
            log.debug(String.format("[getExceptions]请求参数 appId=%s url=%s", jobId, url));
            res = HttpUtils.buildRestTemplate(HttpUtils.TIME_OUT_1_M).getForObject(url, String.class);
            log.debug(String.format("[getExceptions]请求参数结果: res=%s", res));
            if (StringUtils.isEmpty(res)) {
                return null;
            }
        } catch (Exception e) {
            log.error("请求异常{}", e);
        }
        return res;
    }

    public void cancelJobForFlinkByAppId(JobConfig jobConfig) {
        if (StringUtils.isEmpty(jobConfig.getJobId())) {
            throw new DataFactoryException(ErrorCode.MISSING_PARAMS, "jobId");
        }
        String url;
        String flinkHttpAddress = localFlinkHttpAddress;
        if (!DeployModeEnum.LOCAL.name().equals(jobConfig.getDeployMode())) {
            flinkHttpAddress = CommandUtils.parseFlinkHttpAddress(jobConfig.getFlinkRunConfig());
            if (flinkHttpAddress == null) {
                log.error("解析flink集群地址出错，请检查是否-m指定flink集群地址");
                throw new DataFactoryException(ErrorCode.CANCEL_ERROR);
            }
        }
        if (flinkHttpAddress.startsWith("http://") || flinkHttpAddress.startsWith("https://")) {
            url = HttpUtils.buildUrl(flinkHttpAddress, "/jobs/" + jobConfig.getJobId() + "/yarn-cancel");
        } else {
            url = HttpUtils.buildUrl("http://" + flinkHttpAddress, "/jobs/" + jobConfig.getJobId() + "/yarn-cancel");
        }
        log.info(String.format("[cancelJobForFlinkByAppId]请求参数，jobId=%s, url=%s", jobConfig.getJobId(), url));
        String res = "";
        try {
            res = HttpUtils.buildRestTemplate(HttpUtils.TIME_OUT_1_M).getForObject(url, String.class);
        } catch (Exception e) {
            //这里目前只存在 404 错误
            //待后续进行反馈
            log.error("任务取消失败，{}", e);
        }
        log.info(String.format("[cancelJobForFlinkByAppId]请求参数结果: res=%s", res));
    }


    private void checkJobName(String jobName, String id) {
        LambdaQueryWrapper<JobConfig> wrapper = new LambdaQueryWrapper<>();
        long count;
        if (id == null) {
            count = this.count(wrapper.eq(JobConfig::getJobName, jobName));
        } else {
            count = this.count(wrapper.eq(JobConfig::getJobName, jobName).ne(JobConfig::getId, id));
        }
        if (count >= 1) {

        }
    }

    public void checkStart(JobConfig jobConfig) {
        if (jobConfig == null) {
            throw new DataFactoryException(ErrorCode.JOB_DOES_NOT_EXISTS);
        }
        JobConfigStatus status = JobConfigStatus.getJobConfigStatus(jobConfig.getStatus());
        if (JobConfigStatus.RUNNING.equals(status) ||
                JobConfigStatus.STARTING.equals(status)) {
            throw new DataFactoryException(ErrorCode.JOB_STATUS_ERROR);
        }
    }


    public String insertJobRunLog(JobConfig jobConfig) {
        JobRunLog jobRunLog = new JobRunLog();
        jobRunLog.setDeployMode(jobConfig.getDeployMode());
        jobRunLog.setLocalLog("任务启动中，请稍后刷新查看日志");
        jobRunLog.setJobConfigId(jobConfig.getId());
        jobRunLog.setJobName(jobConfig.getJobName());
        jobRunLog.setJobId(jobConfig.getJobId());
        jobRunLog.setJobStatus(JobRunStatus.INITIALIZING.name());
        jobRunLogService.save(jobRunLog);
        return jobRunLog.getId();
    }






    private void checkStandalone(JobConfigDTO jobConfigDTO, JobRunLogDTO jobRunLogDTO) {
        Integer retry = RETRY;
        // 如果服务端jobId不为空，则去flink集群查找
        if (StringUtils.isNotEmpty(jobConfigDTO.getJobId())) {
            // 如果服务端jobId在flink端无法查询到或查询出错，说明任务可能已经被清除，服务端状态同步为STOP
            JobStandaloneInfoDTO jobStandaloneInfoDTO;
            String flinkHttpAddress = localFlinkHttpAddress;
            if (!DeployModeEnum.LOCAL.name().equals(jobConfigDTO.getDeployModeEnum().name())) {
                flinkHttpAddress = CommandUtils.parseFlinkHttpAddress(jobConfigDTO.getFlinkRunConfig());
            }
            jobStandaloneInfoDTO = getJobInfoForStandaloneByAppId(jobConfigDTO.getJobId(),
                    flinkHttpAddress);

            while ((jobStandaloneInfoDTO == null || jobStandaloneInfoDTO.getErrors() != null) && retry-- > 0) {
                log.info("查询flink端任务信息出错,重试中");
                jobStandaloneInfoDTO = getJobInfoForStandaloneByAppId(jobConfigDTO.getJobId(),
                        flinkHttpAddress);
            }

            if (jobStandaloneInfoDTO == null || jobStandaloneInfoDTO.getErrors() != null) {
                log.error("查询flink端任务信息出错");
                jobConfigDTO.setStatus(JobConfigStatus.FAILED);
                jobConfigDTO.setJobId("");
                jobConfigDTO.setEditor("sys_auto");

                jobConfigDTO.setJobId("");

                updateJobConfigById(jobConfigDTO);

//                jobRunLogDTO.setJobStatus(JobConfigStatus.FAILED.name());
//                jobRunLogService.updateJobRunLogById(jobRunLogDTO);

                return;
            }
            // 获取flink端任务状态
            String statusOnFlink = jobStandaloneInfoDTO.getState();
            if (statusOnFlink.equals(jobConfigDTO.getStatus().name())) {
                return;
            }

            // 变更任务状态
            log.warn(String.format("本地任务状态flink集群端不一致，jobStandaloneInfo=%s", jobStandaloneInfoDTO));
            for (JobConfigStatus jobConfigStatus : JobConfigStatus.values()) {
                if (statusOnFlink.equals(jobConfigStatus.name())) {
                    jobConfigDTO.setStatus(jobConfigStatus);
                    jobConfigDTO.setEditor("sys_auto");
                    updateJobConfigById(jobConfigDTO);
                    return;
                }
            }
            // 无法找到对应状态
            log.error(String.format("flink集群端状态为%s，服务端找不到对应的状态", statusOnFlink));
            if (StrUtil.equalsAny(statusOnFlink, JobRunStatus.INITIALIZING.name(), JobRunStatus.RESTARTING.name(), JobRunStatus.CREATED.name())) {
                jobConfigDTO.setStatus(JobConfigStatus.STARTING);
            } else {
                jobConfigDTO.setStatus(JobConfigStatus.RUNNING);
            }
            updateJobConfigById(jobConfigDTO);

        }
        // 如果服务端jobId为空，并且状态为启动中、重启中或运行中，说明任务异常，修改状态为STOP
        else {
            log.error(String.format("服务端找不到jobId，且状态为%s，状态异常", jobConfigDTO.getStatus().name()));
            jobConfigDTO.setStatus(JobConfigStatus.STOP);
            jobConfigDTO.setEditor("sys_auto");
            updateJobConfigById(jobConfigDTO);
        }
    }

//    private void insertJobConfigHistory(String id) {
//        JobConfig jobConfig = this.getById(id);
//        if (jobConfig == null) {
//            log.warn("[insertJobConfigHistory] jobConfig is null id:{} ", id);
//            return;
//        }
//        if (JobType.SQL_STREAMING.getCode() == jobConfig.getJobType() ||
//                JobType.SQL_BATCH.getCode() == jobConfig.getJobType()) {
//            jobConfigHistoryService.insertJobConfigHistory(JobConfigHistoryDTO.to(jobConfig));
//        }
//    }


    @Override
    public URI getUriByConfigId(String configId) {

        JobConfig byId = getById(configId);

        if (Objects.isNull(byId)) {
            return URI.create(localFlinkHttpAddress);
        }

        if (DeployModeEnum.LOCAL.name().equals(byId.getDeployMode())) {
            return URI.create(localFlinkHttpAddress);
        }

        //获取 flinkRunConfig 格式为  -m ip
        String flinkRunConfig = byId.getFlinkRunConfig();

        flinkRunConfig = "http://" + flinkRunConfig.replace("-m", "").trim();

        return URI.create(flinkRunConfig);

    }

    private void downJar(JobRunParamDTO jobRunParamDTO, @NotNull JobConfig jobConfig) {
        if (JobType.JAR.getCode() == jobConfig.getJobType()) {
            try {
                String pathName = URLUtils.downLoadFromUrl(jobConfig.getCustomJarUrl(), udfPath);
                jobRunParamDTO.setMainJarPath(pathName);
            } catch (Exception e) {
                log.error("文件下载失败", e);
            }
        }
    }

    private void updateStatusAndLog(JobConfig config, String jobRunLogId,
                                    String jobStatus, String localLog, String appId) {
        try {
//            JobRunLogDTO jobRunLogDTO = new JobRunLogDTO();
//            jobRunLogDTO.setId(jobRunLogId);
            JobRunLog jobRunLog = jobRunLogService.getById(jobRunLogId);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setId(config.getId());
            if (!StringUtils.isEmpty(appId)) {
                jobConfig.setStatus(JobConfigStatus.RUNNING.getCode());
                jobConfig.setLastStartTime(LocalDateTime.now());
                jobConfig.setJobId(appId);
            } else {
                jobConfig.setStatus(JobConfigStatus.FAILED.getCode());
                jobRunLog.setJobStatus(JobRunStatus.FAILED.name());
            }
            updateById(jobConfig);
            jobRunLog.setLocalLog(localLog);

            //2021.08.30 fix JobLog jobid 未更新的问题
            jobRunLog.setJobId(appId);

            jobRunLogService.updateById(jobRunLog);
        } catch (Exception e) {
            log.error("异步更新数据失败", e);
        }
    }

    private String submitJob(String command, StringBuilder localLog, String jobRunLogId) throws Exception {
        log.info("command = " + command);
        localLog.append("启动命令：").append(command).append("\n");
        Process pcs = Runtime.getRuntime().exec(command);

        String appId = this.clearInfoLogStream(pcs.getInputStream(), localLog, jobRunLogId);
        int rs = pcs.waitFor();
        this.clearLogStream(pcs.getErrorStream(), localLog);
        if (rs != 0) {
            localLog.append("执行异常，rs=").append(rs).append("，appId=").append(appId).append("\n");
            jobRunLogService.updateLogById(localLog.toString(), jobRunLogId);
            throw new RuntimeException("执行异常，rs=" + rs);
        }
        if (StringUtils.isEmpty(appId)) {
            localLog.append("appId无法获取，")
                    .append("请登陆服务器分别查看flink客户端日志、web日志、集群上任务运行历史日志(如果任务提交成功)")
                    .append("\n");
            jobRunLogService.updateLogById(localLog.toString(), jobRunLogId);
            throw new RuntimeException("appId无法获取");
        }
        jobRunLogService.updateLogById(localLog.toString(), jobRunLogId);
        return appId;
    }

    private void clearLogStream(InputStream stream, StringBuilder localLog) {
        BufferedInputStream reader = null;
        try {
            reader = new BufferedInputStream(stream);
            int bytesRead;
            byte[] buffer = new byte[1024];
            while ((bytesRead = reader.read(buffer)) != -1) {
                String result = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
                log.info(result);
                localLog.append("\n").append(result).append("\n");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            this.close(reader, stream, "clearLogStream");
        }
    }

    /**
     * @param stream      输入流
     * @param localLog    字符构造器
     * @param jobRunLogId 任务运行日志id
     * @return flink侧任务id
     */
    private String clearInfoLogStream(InputStream stream, StringBuilder localLog, String jobRunLogId) {
        String appId = null;
        BufferedInputStream reader = null;
        try {
            long lastTime = System.currentTimeMillis();
            byte[] buffer = new byte[1024];
            int bytesRead;
            reader = new BufferedInputStream(stream);
            while ((bytesRead = reader.read(buffer)) != -1) {
                if (!StringUtils.isEmpty(appId)) {
                    return appId;
                }
                String[] msg = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8).split("\n");
                int index = msg.length - 1;
                if (index < 0) {
                    continue;
                }
                String result = msg[index];
                if (StringUtils.isEmpty(appId) && result.contains("job-submitted-success:")) {
                    appId = result.replace("job-submitted-success:", "").trim();
                    localLog.append("[job-submitted-success] 解析得到的appId是:").append(appId).append("\n");
                }
                if (StringUtils.isEmpty(appId) && result.contains("Job has been submitted with JobID")) {
                    appId = result.replace("Job has been submitted with JobID", "").trim();
                    localLog.append("[Job has been submitted with JobID] 解析得到的appId是:").append(appId).append("\n");
                }
                localLog.append(result).append("\n");
                //每隔2s更新日志
                if (System.currentTimeMillis() >= lastTime + 2000L) {
                    jobRunLogService.updateLogById(localLog.toString(), jobRunLogId);
                    lastTime = System.currentTimeMillis();
                }
            }
            log.info(String.format("获取到的appId是 %s  \n", appId));
            return appId;
        } catch (Exception e) {
            log.error("[clearInfoLogStream] is error", e);
            throw new RuntimeException("clearInfoLogStream is error");
        } finally {
            this.close(reader, stream, "clearInfoLogStream");
        }
    }

    private void close(BufferedInputStream reader, InputStream stream, String typeName) {
        if (reader != null) {
            try {
                reader.close();
                log.info(String.format("[%s]关闭reader ", typeName));
            } catch (IOException e) {
                log.error(String.format("[%s] 关闭reader流失败 ", typeName), e);
            }
        }
        if (stream != null) {
            try {
                log.info(String.format("[%s]关闭stream ", typeName));
                stream.close();
            } catch (IOException e) {
                log.error(String.format("[%s] 关闭stream流失败 ", typeName), e);
            }
        }
        log.info("线程池状态: " + WaitForPoolConfig.getInstance().getThreadPoolExecutor());
    }

    @Override
    /**
     * <p>保存更新flinksql可视化配置</p>
     *
     * @param form 表单信息
     * @return void
     * @throws
     * @author zhan9yn
     * @date 2021/12/16 2:32 下午
     */
    public RespDTO<Map> saveViewable(JobViewable form) {
        JobConfig jobConfig = new JobConfig();
        BeanUtils.copyProperties(form, jobConfig);
        //组件配置信息序列化
        if (form.getComponents() == null || form.getComponents().isEmpty()) {
            jobConfig.setIsChecked(JobConstant.NO_CHECKED);
            this.save(jobConfig);
            return RespDTO.onSuc(ImmutableMap.of("id", jobConfig.getId(),
                    "code", 0,
                    "message", "成功"));
        }
        boolean flag = true;
        String configJson = JSON.toJSONString(form.getComponents());
        jobConfig.setViewableConfig(configJson);
        //预交验
        List<JobComponent> components = form.getComponents();
        flag = preCheck(components);
        if (!flag) {
            jobConfig.setIsChecked(JobConstant.NO_CHECKED);
            this.save(jobConfig);
            return RespDTO.onSuc(ImmutableMap.of("id", jobConfig.getId(),
                    "code", ErrorCode.PRECHECK_ERROR.getCode(),
                    "message", ErrorCode.PRECHECK_ERROR.getMessage()));
        }
        setFromExtJarPath(jobConfig, components);
        try {
            //处理拼接flink sql
            String sql = parse2SQL(components);
            if (sql != null) {
                //sql 预检查
                List<String> sqls = SqlValidationUtils.toSqlList(sql);
                SqlValidationUtils.preCheckSql(sqls);
                jobConfig.setFlinkSql(sql);
                jobConfig.setIsChecked(IS_CHECKED);
            }
        } catch (Exception e) {
            flag = false;
            jobConfig.setIsChecked(NO_CHECKED);
            log.error("parse SQL 失败：{}", e);
        }
        if (!flag) {
            this.save(jobConfig);
            return RespDTO.onSuc(ImmutableMap.of("id", jobConfig.getId(),
                    "code", ErrorCode.PRECHECK_ERROR.getCode(),
                    "message", ErrorCode.PRECHECK_ERROR.getMessage()));
        }
        this.save(jobConfig);
        return RespDTO.onSuc(ImmutableMap.of("id", jobConfig.getId(),
                "code", 0,
                "message", "成功"));
    }

    @Override
    @Transactional(noRollbackFor = DataFactoryException.class)
    public void updateViewable(JobViewable form) {
        JobConfig jobConfig = new JobConfig();
        BeanUtils.copyProperties(form, jobConfig);
        jobConfig.setId(form.getJobConfigId());
        if (form.getComponents() == null || form.getComponents().isEmpty()) {
            this.updateById(jobConfig);
            return;
        } else {
            boolean flag = true;
            String configJson = JsonMapper.toAlwaysJson(form.getComponents());
            jobConfig.setViewableConfig(configJson);
            List<JobComponent> components = form.getComponents();

            flag = preCheck(components);
            if (!flag) {
                jobConfig.setIsChecked(JobConstant.NO_CHECKED);
                this.updateById(jobConfig);
                throw new DataFactoryException(ErrorCode.PRECHECK_ERROR);
            }
            setFromExtJarPath(jobConfig, components);
            try {
                //处理拼接flink sql
                String sql = parse2SQL(components);
                if (sql != null) {
                    //sql 预检查
                    List<String> sqls = SqlValidationUtils.toSqlList(sql);
                    SqlValidationUtils.preCheckSql(sqls);
                    jobConfig.setFlinkSql(sql);
                    jobConfig.setIsChecked(IS_CHECKED);
                }
            } catch (Exception e) {
                jobConfig.setIsChecked(NO_CHECKED);
                flag = false;
                log.error("parse SQL 失败：{}", e);

            }
            if (!flag) {
                this.updateById(jobConfig);
                throw new DataFactoryException(ErrorCode.PARSE_ERROR);
            }
        }
        LambdaUpdateWrapper update = Wrappers.<JobConfig>lambdaUpdate().eq(JobConfig::getId, jobConfig.getId())
                .set(JobConfig::getFlinkRunConfig, jobConfig.getFlinkRunConfig());
        this.update(jobConfig, update);
    }

    @Override
    public String preview(List<JobComponent> components) {
        boolean res = preCheck(components);
        if (!res) {
            return null;
        }
        return parse2SQL(components);
    }

    private boolean preCheck(List<JobComponent> components) {
        for (JobComponent c : components) {
            if (isDatasource(c)) {
                ViewableConfig config = c.getConfig();
                BeanPropertyBindingResult result = new BeanPropertyBindingResult(config, "config");
                validator.validate(config, result, Update.class);
                if (result.hasErrors()) {
                    String res = result.getFieldErrors().stream().map(error -> error.getField() + ": " + error.getDefaultMessage()).collect(Collectors.joining("; "));
                    log.error("组件参数校验错误{}", res);
                    return false;
                }
                if (Optional.ofNullable(c.getSourceIds()).orElse(new ArrayList<>()).size() > 1) {
                    log.error("组件规范校验错误: 数据源组件仅支持单输入多输出模式");
                    return false;
                }
            }

            if (isETL(c)) {
                ViewableConfig config = c.getConfig();
                BeanPropertyBindingResult result = new BeanPropertyBindingResult(config, "config");
                validator.validate(config, result, Insert.class);
                if (result.hasErrors()) {
                    String res = result.getFieldErrors().stream().map(error -> error.getField() + ": " + error.getDefaultMessage()).collect(Collectors.joining("; "));
                    log.error("组件参数校验错误{}", res);
                    return false;
                }
                if (Optional.ofNullable(c.getSourceIds()).orElse(new ArrayList<>()).size() != 1
                        && Optional.ofNullable(c.getTargetIds()).orElse(new ArrayList<>()).size() != 1) {
                    log.error("组件规范校验错误: 数据同步组件仅支持单输入单输出切为中间节点");
                    return false;
                }
            }
        }
        return true;
    }

    /**
     * <p>
     * 通过 nodeId 获取当前 DAG 途中的某个节点
     * </p>>
     *
     * @return
     * @throws
     * @author zhan9yn
     * @date 2021/12/16 2:33 下午
     */
    private JobComponent getComponent(List<JobComponent> components, String nodeId) {
        return components.parallelStream().filter(com -> nodeId.equals(com.getNodeId())).findFirst().orElse(null);
    }

    /**
     * <p>判断是否为数据源节点</p>
     *
     * @return
     * @throws
     * @author zhan9yn
     * @date 2021/12/16 2:34 下午
     */
    public boolean isDatasource(JobComponent c) {
        if (c == null) {
            return false;
        }
        if (c.getComponentType() < DATA_SYNC_TYPE) {
            return true;
        }
        return false;
    }

    /**
     * <p>判断是否为 ETL 节点</p>
     *
     * @return
     * @throws
     * @author zhan9yn
     * @date 2021/12/16 2:34 下午
     */
    public boolean isETL(JobComponent c) {
        if (c == null) {
            return false;
        }
        if (c.getComponentType() >= JobConstant.DATA_SYNC_TYPE) {
            return true;
        }
        return false;
    }


    /**
     * <p>
     * 根据数据源信息填充 flinksql with 字段后面的配置信息
     * </p>
     *
     * @return
     * @throws
     * @author zhan9yn
     * @date 2021/12/16 2:34 下午
     */

    private final SmartValidator validator;

    private String extractDatasource(JobComponent c) {

        if (!isDatasource(c)) {
            return null;
        }
        String sourceTemplate = FlinkSQLTemplate.getSQL(c.getComponentType());

        ViewableConfig config = c.getConfig();
        if (Objects.equals(c.getComponentType(), MYSQL_TYPE)) {
            sourceTemplate = String.format(sourceTemplate, config.getIp(), config.getPort(),
                    config.getDatabaseName(), config.getUserName(), config.getPassword(), "%s");
        }
        if (Objects.equals(c.getComponentType(), ORACLE_TYPE)) {
            sourceTemplate = String.format(sourceTemplate, config.getIp(), config.getPort(),
                    config.getBasicValue(), config.getUserName(), config.getPassword(),
                    //oracle connector 的 table 字段是 database.table 形式
                    // 需要先把database 填充上，后面table 只有在拿到下游节点才可以知道
                    config.getDatabaseName() + ".%s");
        }
        if (Objects.equals(c.getComponentType(), CLICKHOUSE_TYPE)) {
            sourceTemplate = String.format(sourceTemplate, config.getIp(), config.getPort(),
                    config.getDatabaseName(), config.getUserName(), config.getPassword(), "%s");
        }
        if (Objects.equals(c.getComponentType(), SQLSERVER_TYPE)) {
            if (config.getDatabaseName().contains("\\.")) {
                throw new DataFactoryException(ErrorCode.PRECHECK_ERROR, "sqlserver 组件的数据库名称不合法");
            }
            String database = config.getDatabaseName().split("\\.")[0];
            String schema = config.getDatabaseName().split("\\.")[1];
            sourceTemplate = String.format(sourceTemplate, config.getIp(), config.getPort(),
                    database, config.getUserName(), config.getPassword(), schema, "%s");
            if (c.getTargetIds() == null || c.getTargetIds().isEmpty()) {
                sourceTemplate = sourceTemplate.replace("'druid.validation-query'='select 1',\n", "");
            }

        }
        if (Objects.equals(c.getComponentType(), DM_TYPE)) {
            sourceTemplate = String.format(sourceTemplate, config.getIp(), config.getPort(),
                    config.getDatabaseName(), config.getUserName(), config.getPassword(), "%s");
        }
        return sourceTemplate;
    }

    /**
     * <p>根据 DAG 图 来处理 flink sql</p>
     *
     * @return
     * @throws
     * @author zhan9yn
     * @date 2021/12/16 2:35 下午
     */
    private String parse2SQL(List<JobComponent> components) {
        if (CollectionUtils.isEmpty(components)) {
            return null;
        }
        Map<String, String> map = new HashMap<>();
        for (JobComponent c : components) {
            if (isDatasource(c)) {
                String sourceTemplate = extractDatasource(c);
                // 数据源信息放在 map 里
                map.put(c.getNodeId(), sourceTemplate);
            }
        }

        // 通过中间节点拼SQL
        StringBuilder sb = new StringBuilder();
        for (JobComponent c : components) {
            if (isETL(c)) {
                String pid = c.getSourceIds().get(0);
                String nid = c.getTargetIds().get(0);
                JobComponent pre = getComponent(components, pid);
                JobComponent next = getComponent(components, nid);
                String sourceSQL = map.get(pid);
                String targetSQL = map.get(nid);
                ViewableConfig config = c.getConfig();
                List<Mapping> mappings = config.getMappings();
                // process sql
                StringBuilder createSourceSeg = new StringBuilder();
                StringBuilder createTargetSeg = new StringBuilder();
                for (Mapping mapping : mappings) {
                    createSourceSeg.append("`" + mapping.getSourceCol().trim() + "`").append(" ").append(TypeConvert.getType(pre.getComponentType(), mapping.getSourceColType())).append(",\n");
                    createTargetSeg.append("`" + mapping.getTargetCol().trim() + "`").append(" ").append(TypeConvert.getType(next.getComponentType(), mapping.getTargetColType())).append(",\n");
                }
                String sourceTb = String.format(JobConstant.TABLE_NAME_TEMPLATE, "source", config.getSourceTb(), c.getNodeId());
                String targetTb = String.format(JobConstant.TABLE_NAME_TEMPLATE, "target", config.getTargetTb(), c.getNodeId());
                String finalSourceSeg = String.format(FlinkSQLTemplate.CREATE_TABLE, sourceTb, createSourceSeg.subSequence(0, createSourceSeg.lastIndexOf(",")));
                String finalTargetSeg = String.format(FlinkSQLTemplate.CREATE_TABLE, targetTb, createTargetSeg.subSequence(0, createTargetSeg.lastIndexOf(",")));
                sourceSQL = String.format(sourceSQL, config.getSourceTb());
                targetSQL = String.format(targetSQL, config.getTargetTb());
                sourceSQL = finalSourceSeg + sourceSQL;
                targetSQL = finalTargetSeg + targetSQL;
                String syncDataSQL = String.format(FlinkSQLTemplate.getSQL(c.getComponentType()), targetTb, sourceTb);
                sb.append(sourceSQL).append(targetSQL).append(syncDataSQL);
            }
        }
        return sb.toString();
    }

    /**
     * Flink服务器测试
     *
     * @param flinkRunConfig 任务配置
     */
    private void flinkPingTest(JobConfig jobConfig,String jobLogId) {
        String flinkHttpAddress = CommandUtils.parseFlinkHttpAddress(jobConfig.getFlinkRunConfig());
        //构建uri
        String uri = "";
        if (flinkHttpAddress == null) {
            flinkHttpAddress = localFlinkHttpAddress;
        }
        if (flinkHttpAddress.startsWith("http") || flinkHttpAddress.startsWith("https")) {
            uri = flinkHttpAddress;
        } else {
            uri = "http://" + flinkHttpAddress;
        }
        try {
            //ping 测试
            FlinkUtil.ping(uri, true);
        }catch (DataFactoryException e){
            //未通过flink服务端测试，设置状态为失败
            jobConfig.setStatus(JobConfigStatus.FAILED.getCode());
            this.updateById(jobConfig);
            //传入日志ID非空，状态也设置为失败
            if(StrUtil.isNotBlank(jobLogId)){
                JobRunLog jobRunLog=jobRunLogService.getById(jobLogId);
                jobRunLog.setJobStatus(JobRunStatus.FAILED.name());
                StringBuffer localLog= new StringBuffer();
                localLog.append("提交任务失败，失败原因：").append(e.getMessage());
                jobRunLog.setLocalLog(localLog.toString());
                jobRunLogService.updateById(jobRunLog);
            }
            throw e;
        }

    }

    private void readyToSubmitJob(JobRunParamDTO jobRunParamDTO, JobConfig jobConfig, String jobRunLogId) {
        String jobStatus = JobRunStatus.SUCCESS.name();
        String appId = "";
        boolean success = true;
        StringBuilder localLog = new StringBuilder()
                .append("开始提交任务：")
                .append(DateUtil.now()).append("\n")
                .append("三方jar: ").append("\n")
                .append(jobConfig.getExtJarPath())
                .append("\n");

        try {
            String command;

            //如果是自定义提交jar模式下载文件到本地
            downJar(jobRunParamDTO, jobConfig);
            //根据任务模式不同，JAR方式还是原有逻辑，SQL方式走HTTP提交
            if (JobType.JAR.getCode() == jobConfig.getJobType()) {
                command = CommandUtils.buildRunCommandForCluster(jobRunParamDTO, jobConfig, clientHome);
                log.info("command :\n{}", command);
                //2、提交任务
                appId = submitJob(command, localLog, jobRunLogId);
            } else {
                List<String> classPaths = Lists.newArrayList();
                String extJarPath = jobConfig.getExtJarPath();
                if (StrUtil.isNotBlank(extJarPath)) {
                    classPaths = StrUtil.split(extJarPath,StrPool.COMMA);
                }
                String serviceAddress="";
                int parallelism=1;
                if(StrUtil.isNotBlank(jobConfig.getFlinkRunConfig())){
                    serviceAddress=StrUtils.getParamByPattern(StrUtils.getPatternByFlag("m"),jobConfig.getFlinkRunConfig());
                    String parallelismString=StrUtils.getParamByPattern(StrUtils.getPatternByFlag("s"),jobConfig.getFlinkRunConfig());
                    if(StrUtil.isNotBlank(parallelismString)){
                        parallelism = Integer.parseInt(parallelismString);
                    }
                    if(StrUtil.isBlank(serviceAddress)&&StrUtil.isBlank(parallelismString)){
                        throw new DataFactoryException(ErrorCode.FLINK_CONFIG_ERROR);
                    }
                }
                appId = flinkSqlExecutorJarManager.submitSqlJob(jobConfig.getFlinkSql(), classPaths,serviceAddress, parallelism, localLog, jobRunLogId);
            }
        } catch (Exception e) {
            //submitJob() 出现的异常
            log.error("由于未知错误，无法获取appId: ", e);
            localLog.append(e);
            success = false;
            jobStatus = JobRunStatus.FAILED.name();
        } finally {
            localLog.append("\n启动结束时间: ").append(DateUtil.now()).append("\n");
            if (success && StringUtils.isNotEmpty(appId)) {
                //submitJob 未抛异常，但可能 appId 为空
                localLog.append("启动成功");
                updateStatusAndLog(jobConfig, jobRunLogId, JobRunStatus.RUNNING.name(), localLog.toString(), appId);
                String finalAppId = appId;
                statusExecutor.execute(new StatusAndLogHandler(new StringBuffer(localLog.toString()), finalAppId,
                        jobConfig, jobRunLogId, loginUserManager.getCurrentLoginUser()));
            } else {
                localLog.append("启动失败");
                updateStatusAndLog(jobConfig, jobRunLogId, jobStatus, localLog.toString(), appId);
            }
        }
    }

    /**
     * 获取任务运行参数对象
     *
     * @param jobConfig 任务设置
     * @return 返回任务参数对象
     */
    private JobRunParamDTO getJobRunParamDTO(JobConfig jobConfig) {
        String sqlFilePath = "";
        //只有任务类型不为Jar时，才需写入sql文件
        //不再需要写入文件
//        if (!Objects.equals(jobConfig.getJobType(), JobType.JAR.getCode())) {
//            sqlFilePath = sqlPath + FileUtils.createFileName(String.valueOf(jobConfig.getId()));
//            FileUtils.writeText(sqlFilePath, jobConfig.getFlinkSql() == null ? "" : jobConfig.getFlinkSql(), false);
//        }
        JobRunParamDTO jobRunParamDTO = JobRunParamDTO.buildJobRunParam(jobConfig, sqlFilePath, clientHome);
        return jobRunParamDTO;
    }

    /*
     * 设置提交表单的资源信息
     * 如果表单中没有资源信息，则根据节点的type，获取对应的默认数据库资源信息
     */
    private void setFromExtJarPath(JobConfig jobConfig, List<JobComponent> components) {
        if (StrUtil.isBlank(jobConfig.getExtJarPath()) && CollectionUtil.isNotEmpty(components)) {
            Set<String> jarPaths = Sets.newHashSet();
            for (JobComponent component : components) {
                if (NumberUtil.equals(component.getComponentType(), MYSQL_TYPE) ||
                        NumberUtil.equals(component.getComponentType(), ORACLE_TYPE) ||
                        NumberUtil.equals(component.getComponentType(), CLICKHOUSE_TYPE) ||
                        NumberUtil.equals(component.getComponentType(), SQLSERVER_TYPE) ||
                        NumberUtil.equals(component.getComponentType(), DM_TYPE)
                ) {
                    jarPaths.add(
                            InitiallyUtils.markInitially(
                                    () -> resourceService.getOne(Wrappers.lambdaQuery(Resource.class).eq(Resource::getConnectType, component.getComponentType()).eq(Resource::getTenantId, DEFAULT_TENANT_ID))
                            ).getUrl()
                    );
                }
            }
            //最后添加默认flinkx依赖
            jarPaths.add(
                    InitiallyUtils.markInitially(
                            () -> resourceService.getOne(Wrappers.lambdaQuery(Resource.class).eq(Resource::getName, FLINK_COMMON_LIB_NAME).eq(Resource::getTenantId, DEFAULT_TENANT_ID))
                    ).getUrl()
            );
            jobConfig.setExtJarPath(StrUtil.join(StrPool.COMMA, jarPaths));
        }
    }
}
